AWS Glue 中的 ETL 的连接类型和选项

您所在的位置:网站首页 mongodb uri可选参数 AWS Glue 中的 ETL 的连接类型和选项

AWS Glue 中的 ETL 的连接类型和选项

2023-10-06 22:20| 来源: 网络整理| 查看: 265

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AWS Glue 中的 ETL 的连接类型和选项

在AWS Glue,各种 PySpark 和 Scala 方法和转换使用connectionType参数指定连接类型。它们使用 connectionOptions 或 options 参数指定连接选项。

connectionType 参数可以采用下表中显示的值。下面几个部分介绍每种类型所关联的 connectionOptions(或 options)参数值。除非另有说明,否则这些参数会在连接用作源或接收器时使用。

有关展示如何设置和使用连接选项的代码示例,请参阅示例:设置连接类型和选项。

connectionType 连接到 custom.* Spark、Athena 或 JDBC 数据存储(请参阅自定义和 AWS Marketplace connectionType 值) documentdb Amazon DocumentDB (with MongoDB compatibility) 数据库 dynamodb Amazon DynamoDB 数据库 kafka Kafka 或 Amazon Managed Streaming for Apache Kafka kinesis Amazon Kinesis Data Streams marketplace.* Spark、Athena 或 JDBC 数据存储(请参阅自定义和 AWS Marketplace connectionType 值) mongodb MongoDB 数据库 mysql MySQL 数据库(请参阅JDBC connectionType 值) oracle Oracle 数据库(请参阅JDBC connectionType 值) orc Amazon Simple Storage Service(Amazon S3)中以 Apache Hive 优化的行列式(ORC)文件格式存储的文件 parquet Amazon S3 中以 Apache Parquet 文件格式存储的文件 postgresql PostgreSQL 数据库(请参阅JDBC connectionType 值) redshift Amazon Redshift 数据库(请参阅Redshift 连接) S3 Amazon S3 sqlserver Microsoft SQL Server 数据库(请参阅JDBC connectionType 值) "connectionType": "documentdb"

指定与 Amazon DocumentDB (with MongoDB compatibility) 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "documentdb" as Source

将 "connectionType": "documentdb" 用作源时可使用以下连接选项:

"uri":(必需)要从中读取数据的 Amazon DocumentDB 主机,格式为 mongodb://:。

"database":(必需)要从中读取数据的 Amazon DocumentDB 数据库。

"collection":(必需)要从中读取数据的 Amazon DocumentDB 连接。

"username":(必需)Amazon DocumentDB 用户名。

"password":(必需)Amazon DocumentDB 密码。

"ssl":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为 "true"。

"ssl.domain_match":(如果使用 SSL,则必需)如果您的连接使用 SSL,则必须包含此选项且值为 "false"。

"batchSize":(可选)每个批处理返回的文档数量,在内部批处理的游标中使用。

"partitioner":(可选)从 Amazon DocumentDB 中读取输入数据的分区器的类名称。该连接器提供以下分区器:

MongoDefaultPartitioner (默认值)

MongoSamplePartitioner

MongoShardedPartitioner

MongoSplitVectorPartitioner

MongoPaginateByCountPartitioner

MongoPaginateBySizePartitioner

"partitionerOptions":(可选)指定分区器的选项。各个分区器支持的选项如下:

MongoSamplePartitioner: partitionKey, partitionSizeMB, samplesPerPartition

MongoShardedPartitioner: shardkey

MongoSplitVectorPartitioner:partitionKey、partitionSizeMB

MongoPaginateByCountPartitioner: partitionKey, numberOfPartitions

MongoPaginateBySizePartitioner:partitionKey、partitionSizeMB

有关这些选项的更多信息,请参阅 MongoDB 文档中的分区器配置。有关示例代码,请参阅 示例:设置连接类型和选项。

"connectionType": "documentdb" as Sink

将 "connectionType": "documentdb" 用作连接器时可使用以下连接选项:

"uri":(必需)要在其中写入数据的 Amazon DocumentDB 主机,格式为 mongodb://:。

"database":(必需)要在其中写入数据的 Amazon DocumentDB 数据库。

"collection":(必需)要在其中写入数据的 Amazon DocumentDB 连接。

"username":(必需)Amazon DocumentDB 用户名。

"password":(必需)Amazon DocumentDB 密码。

"extendedBsonTypes":(可选)如果为 true,则在 Amazon DocumentDB 中写入数据时会启用扩展 BSON 类型。默认为 true。

"replaceDocument":(可选)如果为 true,则在保存包含 _id 字段的数据集时会替换整个文档。如果为 false,则只会更新文档中与数据集中的字段匹配的字段。默认为 true。

"maxBatchSize":(可选)保存数据时的批量操作的最大批次大小。默认值为 512。

有关示例代码,请参阅 示例:设置连接类型和选项。

"connectionType": "dynamodb"

指定与 Amazon DynamoDB 的连接。

源连接和接收器连接的连接选项不同。

"connectionType": "dynamodb" with the ETL connector as Source

在使用 AWS Glue DynamoDB ETL 连接器时,请使用以下连接选项并将 "connectionType": "dynamodb" 作为源:

"dynamodb.input.tableName":(必需)要从中读取数据的 DynamoDB 表格。

"dynamodb.throughput.read.percent":(可选)要使用的读取容量单位 (RCU) 的百分比。默认设置为“0.5”。可接受的值从“0.1”到“1.5”,包含这两个值。

0.5 表示默认读取速率,这意味着 AWS Glue 将尝试占用表的一半的读取容量。如果增加值超过 0.5,AWS Glue 将增加请求速率;将值降低到 0.5 以下将降低读取请求速率。(实际读取速率取决于 DynamoDB 表中是否存在统一键分配的等因素。)

当 DynamoDB 表处于按需模式时,AWS Glue 处理表的读取容量为 40000。要导出大型表,我们建议您将 DynamoDB 表切换为按需模式。

"dynamodb.splits":(可选)定义在读取时将此 DynamoDB 表分成多少个部分。默认设置为“1”。可接受的值从“1”到“1,000,000”,包含这两个值。

1 表示没有并行度。我们强烈建议您使用以下公式指定更大的值以获得更好的性能。

我们建议您使用以下公式计算 numSlots,并将其用作 dynamodb.splits。如果您需要更高的性能,我们建议您增加 DPU 数量以扩展任务。

工件数量(NumberOfWorkers)在作业配置中设置。有关更多信息,请参阅在 AWS Glue 中添加作业:启用自动扩展时,作业的可用工件数量可能会因工作负载而调整。就上下文而言,为 Spark 驱动程序保留了一个执行程序;其他执行程序用于处理数据。

numExecutors =

NumberOfWorkers - 1,如果 WorkerType 为 G.1X 或 G.2X

MaximumCapacity * 2 - 1 如果 WorkerType 是 Standard 且 AWS Glue 版本是 2.0 以上。

(MaximumCapacity - 1) * 2 - 1 如果 WorkerType 是 Standard 且 AWS Glue 版本是 1.0 及以下。

numSlotsPerExecutor =

Glue 3.0+

4,如果 WorkerType 为 Standard 或 G.1X

8,如果 WorkerType 为 G.2X

Glue 2.0 and legacy versions

4,如果 WorkerType 为 Standard

8,如果 WorkerType 为 G.1X

16,如果 WorkerType 为 G.2X

numSlots = numSlotsPerExecutor * numExecutors

"dynamodb.sts.roleArn":(可选)用于跨账户访问的 IAM 角色 ARN。此参数适用于 AWS Glue 1.0 或更高版本。

"dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为 “glue-dynamodb-read-sts-sessession”。此参数适用于 AWS Glue 1.0 或更高版本。

以下代码示例演示了如何从 DynamoDB 表中读取(通过 ETL 连接器)以及向其写入数据。它们演示了如何从一个表读取数据并将数据写入其他表。

Python import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": "test_source", "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": "test_sink", "dynamodb.throughput.write.percent": "1.0" } ) job.commit() Scala import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> "test_source", "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> "test_sink", "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } } 注意

AWS Glue 支持从其他 AWS 账户的 DynamoDB 表读取数据。有关更多信息,请参阅跨账户、跨区域访问 DynamoDB 表:

注意

DynamoDB ETL 读取器不支持筛选条件或下推谓词。

"connectionType": "dynamodb" with the AWS Glue DynamoDB export connector as Source

除了AWS Glue DnamoDB ETL 连接器还AWS Glue提供了一个DynamoDB 导出连接器,该连接器会调用 DynamoDBExportTableToPointInTime 请求并以 Dyn amoDB JSON 格式将其存储在您提供的 Amazon S3 位置。 AWS Glue然后通过从Amazon S3 导出位置读取数据创建 DynamicFrame 对象。

在 DynamoDB 表大小超过 80 GB 时,导出连接器的性能优于 ETL 连接器。此外,鉴于导出请求在 AWS Glue 任务中的 Spark 进程之外执行,您可以启用 AWS Glue 任务的弹性伸缩以节省导出请求期间的 DPU 使用量。借助导出连接器,您也无需为 Spark 执行程序并行度或 DynamoDB 吞吐量读取百分比配置拆分数。

在使用 AWS Glue DynamoDB 导出连接器(仅适用于 AWS Glue 版本 2.0 以上)时,使用以下连接选项并将 "connectionType": "dynamodb"用作源:

"dynamodb.export":(必需)字符串值:

如果设置为 ddb,将启用 AWS Glue DynamoDB 导出连接器,其中在 AWS Glue 任务期间将调用新的 ExportTableToPointInTimeRequest。新的导出将通过从 dynamodb.s3.bucket 和 dynamodb.s3.prefix 传递的位置生成。

如果设置为 s3,将启用 AWS Glue DynamoDB 导出连接器但会跳过创建新的 DynamoDB 导出,而使用 dynamodb.s3.bucket 和 dynamodb.s3.prefix 作为该表以前导出的 Simple Storage Service (Amazon S3) 位置。

"dynamodb.tableArn":(必需)要从中读取数据的 DynamoDB 表格。

"dynamodb.unnestDDBJson":(可选)采用布尔值。如果设置为 true(真),则对导出中存在的 DynamoDB JSON 结构执行解除嵌套转换。默认值设置为 false。

"dynamodb.s3.bucket":(可选)指示将会执行 DynamoDB ExportTableToPointInTime 进程的 Amazon S3 存储桶位置。导出的文件格式为 DynamoDB JSON。

"dynamodb.s3.prefix":(可选)指示将用于存储 DynamoDB ExportTableToPointInTime 负载的 Amazon S3 存储桶内的 Amazon S3 前缀位置。如果既未指定 dynamodb.s3.prefix,也未指定 dynamodb.s3.bucket,则这些值将默认为 AWS Glue 任务配置中指定的临时目录位置。有关更多信息,请参阅 AWS Glue 使用的特殊参数。

"dynamodb.s3.bucketOwner":指示跨账户 Amazon S3 访问所需的存储桶拥有者。

"dynamodb.sts.roleArn":(可选)跨账户访问和/或跨区域访问 DynamoDB 表时将会代入的 IAM 角色 ARN。注意:相同的 IAM 角色 ARN 将用于访问为 ExportTableToPointInTime 请求指定的 Amazon S3 位置。

"dynamodb.sts.roleSessionName":(可选)STS 会话名称。默认设置为 “glue-dynamodb-read-sts-sessession”。

注意

DynamoDB 对调用 ExportTableToPointInTime 请求有特定的要求。有关更多信息,请参阅在 DynamoDB 中请求表导出。例如,表需要启用时间点恢复 (PITR) 才能使用此连接器。DynamoDB 连接器还支持在将 DynamoDB 导出到 Amazon S3 时进行 AWS KMS 加密。在 AWS Glue 任务配置中指定安全性配置,将为 DynamoDB 导出启用 AWS KMS 加密。KMS 密钥必须与 Simple Storage Service (Amazon S3) 存储桶位于同一区域。

请注意,您需要支付 DynamoDB 导出的额外费用和 Simple Storage Service (Amazon S3) 存储成本。任务运行完成后,Simple Storage Service (Amazon S3) 中的导出数据仍然存在,因此您无需其他 DynamoDB 导出即可重复使用这些数据。使用此连接器的一个要求是该表启用了 point-in-time 恢复 (PITR)。

DynamoDB ETL 连接器或导出连接器不支持在 DynamoDB 源应用筛选条件或下推谓词。

以下代码示例演示如何进行读取(通过导出连接器)以及打印分区数量。

Python import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "", "dynamodb.s3.bucket": "", "dynamodb.s3.prefix": "", "dynamodb.s3.bucketOwner": "", } ) print(dyf.getNumPartitions()) job.commit() Scala import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "", "dynamodb.s3.bucket" -> "", "dynamodb.s3.prefix" -> "", "dynamodb.s3.bucketOwner" -> "", )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

以下示例演示如何从具有 dynamodb 分类的 AWS Glue 数据目录表进行读取(通过导出连接器)以及打印分区数量:

Python import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database="", table_name=" 0 limit 1" val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> table, "user" -> user, "password" -> password, "sampleQuery" -> query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame() 例 将 sampleQuery 与 JDBC 分区一起使用

以下代码示例演示了如何将 sampleQuery 与 JDBC 分区一起使用。

//note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> table, "user" -> user, "password" -> password, "hashfield" -> primaryKey, "sampleQuery" -> query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

如果是 Amazon Redshift 连接类型,用于 JDBC 的连接选项中包含的所有其他选项中包含的所有其他选项名称/值对(包括格式选项)将直接传递到底层 SparksQL 连接类型 DataSource。有关更多信息,请参阅适用于 Spark 的 Amazon Redshift 数据源。

以下代码示例演示了如何使用自定义 JDBC 驱动程序读取和写入 JDBC 数据库。这些示例演示了如何从一个版本的数据库产品读取和写入同一产品的更高版本。

Python import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://path/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options) Scala import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://path/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://path/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(url: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(url: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } } 注意

AWS Glue 作业在一次运行期间仅与一个子网关联。这可能会影响您使用同一作业连接到多个数据来源。此行为不仅限于 JDBC 源。

自定义和 AWS Marketplace connectionType 值

这些功能包括:

"connectionType": "marketplace.athena":指定与 Amazon Athena 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。

"connectionType": "marketplace.spark":指定与 Apache Spark 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。

"connectionType": "marketplace.jdbc":指定与 JDBC 数据存储的连接。连接使用来自 AWS Marketplace 的连接器。

"connectionType": "custom.athena":指定与 Amazon Athena 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

"connectionType": "custom.spark":指定与 Apache Spark 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

"connectionType": "custom.jdbc":指定与 JDBC 数据存储的连接。连接使用您上传到 AWS Glue Studio 的自定义连接器。

适用于类型 custom.jdbc 或 marketplace.jdbc 的连接选项

className – 字符串,必需,驱动程序类名称。

connectionName – 字符串,必需,与连接器关联的连接的名称。

url – 字符串,必需,用于建立与数据源的连接且带占位符(${})的 JDBC URL。占位符 ${secretKey} 替换为 AWS Secrets Manager 中同名的密钥。有关构建 URL 的详细信息,请参阅数据存储文档。

secretId 或 user/password – 字符串,必需,用于检索 URL 的凭证。

dbTable 或 query – 字符串,必需,从中获取数据的表或 SQL 查询。您可以指定 dbTable 或 query,但不能同时指定两者。

partitionColumn – 字符串,可选,用于分区的整数列的名称。此选项仅在包含 lowerBound、upperBound 和 numPartitions 时有效。此选项的工作方式与 Spark SQL JDBC 阅读器中的工作方式相同。有关更多信息,请参阅《Apache Spark SQL DataFrames 和《数据集指南》中的 JDB 转换到其他数据库。

lowerBound 和 upperBound 值用于确定分区步长,而不是用于筛选表中的行。对表中的所有行进行分区并返回。

注意

使用查询(而不是表名称)时,您应验证查询是否适用于指定的分区条件。例如:

如果您的查询格式为 "SELECT col1 FROM table1",则在使用分区列的查询结尾附加 WHERE 子句,以测试查询。

如果您的查询格式为 SELECT col1 FROM table1 WHERE col2=val",则通过 AND 和使用分区列的表达式扩展 WHERE 子句,以测试查询。

lowerBound – 整数,可选,用于确定分区步长的最小 partitionColumn 值。

upperBound – 整数,可选,用于确定分区步长的最大 partitionColumn 值。

numPartitions – 整数,可选,分区数。此值以及 lowerBound(包含)和 upperBound(排除)为用于拆分 partitionColumn 而生成的 WHERE 子句表达式构成分区步长。

重要

请注意分区的数量,因为分区过多可能会导致外部数据库系统出现问题。

filterPredicate – 字符串,可选,用于筛选源数据的额外条件子句。例如:

BillingCity='Mountain View'

使用查询(而不是表名称)时,您应验证查询是否适用于指定的 filterPredicate。例如:

如果您的查询格式为 "SELECT col1 FROM table1",则在使用筛选条件谓词的查询结尾附加 WHERE 子句,以测试查询。

如果您的查询格式为 "SELECT col1 FROM table1 WHERE col2=val",则通过 AND 和使用筛选条件谓词的表达式扩展 WHERE 子句,以测试查询。

dataTypeMapping – 目录,可选,用于构建从 JDBC 数据类型到 Glue 数据类型的映射的自定义数据类型映射。例如,选项 "dataTypeMapping":{"FLOAT":"STRING"} 会通过调用驱动程序的 ResultSet.getString() 方法,将 JDBC 类型 FLOAT 的数据字段映射到 Java String 类型,并将其用于构建 AWS Glue 记录。ResultSet 对象由每个驱动程序实现,因此行为特定于您使用的驱动程序。请参阅 JDBC 驱动程序的文档,了解驱动程序执行转换的方式。

目前受支持的 AWS Glue 数据类型包括:

DATE

STRING

TIMESTAMP

INT

FLOAT

LONG

BIGDECIMAL

BYTE

SHORT

DOUBLE

支持的 JDBC 数据类型为 Java8 java.sql.types。

默认数据类型映射(从 JDBC 到 AWS Glue)如下:

DATE -> DATE

VARCHAR -> STRING

CHAR -> STRING

LONGNVARCHAR -> STRING

TIMESTAMP -> TIMESTAMP

INTEGER -> INT

FLOAT -> FLOAT

REAL -> FLOAT

BIT -> BOOLEAN

BOOLEAN -> BOOLEAN

BIGINT -> LONG

DECIMAL -> BIGDECIMAL

NUMERIC -> BIGDECIMAL

TINYINT -> SHORT

SMALLINT -> SHORT

DOUBLE -> DOUBLE

如果将自定义数据类型映射与选项 dataTypeMapping 结合使用,则可以覆盖默认数据类型映射。只有 dataTypeMapping 选项中列出的 JDBC 数据类型会受到影响;默认映射适用于所有其他 JDBC 数据类型。如果需要,您可以为其他 JDBC 数据类型添加映射。如果默认映射或自定义映射中均未包含 JDBC 数据类型,则数据类型默认转换为 AWS Glue STRING 数据类型。

以下 Python 代码示例演示了如何使用 AWS Marketplace JDBC 驱动程序从 JDBC 数据库读取数据。它演示了如何从数据库读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3:///", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3:///", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit() 适用于类型 custom.athena 或 marketplace.athena 的连接选项

className – 字符串,必需,驱动程序类名称。当您使用 AthenaCloudWatch 连接器时,此参数值是类名称(例如)的前缀(例如"com.amazonaws.athena.connectors")的前缀。AthenaCloudWatch 连接器由两个类组成:元数据处理程序和记录处理程序。如果您在此处提供通用前缀,则 API 会根据该前缀加载正确的类。

tableName— 字符串,必需,要读取 CloudWatch 的日志流的名称。此代码段使用特别视图名称 all_log_streams,这意味着返回的动态数据框将包含日志组中所有日志流的数据。

schemaName— 字符串,必需,要从中读取数据。 CloudWatch 例如,/aws-glue/jobs/output。

connectionName – 字符串,必需,与连接器关联的连接的名称。

有关此连接器的其他选项,请参阅上的 Amazon Amazon Amazon Athena CloudWatch 连接器自述文件 GitHub。

以下 Python 代码示例演示了如何从使用 AWS Marketplace 连接器的 Athena 数据存储读取数据。它演示了如何从 Athena 读取数据并将数据写入 S3 位置。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3:///", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3:///", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit() 适用于类型 custom.spark 或 marketplace.spark 的连接选项

className – 字符串,必需,连接器类名称。

secretId – 字符串,可选,用于检索连接器连接的凭证。

connectionName – 字符串,必需,与连接器关联的连接的名称。

其他选项取决于数据存储。例如, OpenSearch 配置选项以前缀es,正如适用于 Apache Hadoop 的 Elasticsearch 文档中所述。Spark 与 Snowflake 的连接使用 sfUser 和 sfPassword 等连接,正如《连接 Snowflake》指南中的使用 Spark 连接器所述。

以下 Python 代码示例演示了如何从使用marketplace.spark连接的 OpenSearch 数据存储读取数据。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3:///", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3:///", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3